package mqtt.manager;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;

/**
 * MQTT管理器
 *
 * @Author: Lamb
 * @Date: 2022/9/23 15:24
 */
public class MqttManager {

    private MqttPahoMessageDrivenChannelAdapter adapter;

//    @Autowired
//    private ProdTopicMapper proTopicMapper;

    // 根据方法参数名称依赖注入（jdk1.8+）
    @Autowired
    public void setReceiverMessageProducer(MessageProducer receiverMessageProducer) {
        adapter = (MqttPahoMessageDrivenChannelAdapter) receiverMessageProducer;
//        proTopicMapper.selectList(null).stream().forEach(item->{
//            HashSet<String> topics = new HashSet(Arrays.asList(adapter.getTopic()));
//            if (!topics.contains(item.getTopic())) {
//                adapter.addTopic(item.getTopic());
//            }
//        });;
//        log.debug("adapter --- -> " + adapter);
        //adapter.addTopic("test");
    }

    /**
     * 添加topic
     *
     * @param topic
     */
    public void addTopic(String topic) {
        adapter.addTopic(topic);
    }

    public void addTopics(String[] topics) {
        adapter.addTopic(topics);
    }

    /**
     * 移除topic
     *
     * @param topic
     */
    public void removeTopic(String topic) {
        adapter.removeTopic(topic);
    }

    /**
     * 获取当前订阅的主题
     *
     * @return
     */
    public String[] getTopics() {
        return adapter.getTopic();
    }

    /**
     * 获取当前订阅主题的QOS
     *
     * @return
     */
    public int[] getQoss() {
        return adapter.getQos();
    }


}
